package com.atguigu.flink.watermark;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

/**
 * Created by Smexy on 2023/11/14
 *

 水印可以从源头产生，也可以从中间环节产生。
    从中间环节产生，需要使用assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy))
 */
public class Demo1_CreateWaterMark
{
    public static void main(String[] args) {

        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 3333);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        env.setParallelism(1);
        env.disableOperatorChaining();
        //调整水印自动发送的周期
        env.getConfig().setAutoWatermarkInterval(3000);

        SingleOutputStreamOperator<WaterSensor> ds = env
            .socketTextStream("hadoop102", 8888)
            .map(new WaterSensorMapFunction());

        //构建水印策略
        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
            .<WaterSensor>forMonotonousTimestamps()
            .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>()
            {
                //抽取数据中的时间戳(毫秒)
                @Override
                public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                    return element.getTs();
                }
            });

                 ds
                     //生成水印，向下游广播
                    .assignTimestampsAndWatermarks(watermarkStrategy)
                     .process(new ProcessFunction<WaterSensor, WaterSensor>()
                     {
                         @Override
                         public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {

                             //获取水印
                             System.out.println("当前数据的时间属性:"+ctx.timestamp());
                             TimerService timerService = ctx.timerService();
                             System.out.println("当前数据到达时，算子的水印(时钟):"+timerService.currentWatermark());
                             System.out.println("当前数据到达时，计算机的物理时钟，processingTime："+timerService.currentProcessingTime());
                             out.collect(value);

                         }
                     })
                     .print();

                try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }

    }
}
